boto3で実行したAthenaの検索結果を、pandasのDataFrameとして受け取る方法
DA事業本部の横山です。
今回はboto3
を用いて実行したAthenaのクエリ結果を、AWS SDK for pandas (awswrangler)
を用いてpandas
のDataFrameとして取得してみました。
前提条件
本記事で利用している各ライブラリのバージョンは以下になります。
boto3
:1.26.135
awswrangler
:3.1.1
pandas
:2.0.1
注意事項
AWS SDK for pandas (awswrangler)
を用いれば、検索の実行や処理状況のポーリング自体もawswrangler
で行えます。しかし本記事ではあくまで検索の実行をboto3
で行い、検索結果の取得,DataFrameへの変換をawswrangler
で行うという内容の記事になっております。
重要な部分
本章移行に実行したコードや出力内容を記載しますが、重要な点は以下です。
- Athenaで検索開始
boto3
を使ってAthenaへ検索を行います
- レスポンス内容から
QueryExecutionId
を取得します- 実際にクエリが実行完了したかをポーリングして確認する必要がありますが、本記事では割愛(コードには載っています)
- Athenaから発行された
QueryExecutionId
を用いて検索結果をDataFrameとして取得するawswrangler
のget_query_results()
を利用する
# Athenaで検索開始 response = athena_cli.start_query_execution( QueryString=sql, QueryExecutionContext={ "Database": database, }, ResultConfiguration={ "OutputLocation": s3_output, }, WorkGroup=workgroup, ) # QueryExecutionIdを取得 queryExecutionId = response.get("QueryExecutionId") response = athena_cli.get_query_execution(QueryExecutionId=queryExecutionId) state = response.get("QueryExecution").get("Status").get("State") # QueryExecutionIdから検索結果をDataFrameで取得(awswrangler) df = get_query_results( query_execution_id=queryExecutionId, )
awswrangler.athena.get_query_results()
のドキュメントは以下になります。
コード内容
import logging import logging.config import time import traceback import boto3 import pandas as pd from awswrangler.athena import get_query_results logging.config.fileConfig( "logging.ini", disable_existing_loggers=False, ) logger = logging.getLogger(__name__) def read_sql_query( athena_cli: str, sql: str, database: str, workgroup: str, s3_output: str, athena_query_wait_polling_delay: float = 0.25, ) -> pd.DataFrame: """Athenaでクエリを実行し、クエリ実行結果をDataFrameで取得する""" try: # クエリを準備 response = athena_cli.start_query_execution( QueryString=sql, QueryExecutionContext={ "Database": database, }, ResultConfiguration={ "OutputLocation": s3_output, }, WorkGroup=workgroup, ) # QueryExecutionIdを取得 queryExecutionId = response.get("QueryExecutionId") # クエリ実行中 while True: response = athena_cli.get_query_execution(QueryExecutionId=queryExecutionId) state = response.get("QueryExecution").get("Status").get("State") if state == "SUCCEEDED": logging.info("SUCCEEDED") break elif state in ("CANCELLED", "FAILED"): logger.info(f"QueryExecution Status is {state}.") raise Exception(f"QueryExecution Status is {state}.") else: time.sleep(athena_query_wait_polling_delay) # クエリ実行時の統計情報を取得 logging.info( f"QueryQueueTimeInMillis: {response.get('QueryExecution').get('Statistics').get('QueryQueueTimeInMillis')}" ) logging.info( f"TotalExecutionTimeInMillis: {response.get('QueryExecution').get('Statistics').get('TotalExecutionTimeInMillis')}" ) logging.info( f"DataScannedInBytes: {response.get('QueryExecution').get('Statistics').get('DataScannedInBytes')}" ) # クエリ実行結果をDataFrameで取得 df = get_query_results( query_execution_id=queryExecutionId, ) except Exception as e: logging.error(e) logging.error(traceback.format_exc()) else: logging.info("OK") finally: return df def main(): database = "test_database" s3_output = f"s3://path/to/athena/output" workgroup = "test_workgroup" sql = """ SELECT * FROM "test_database"."test_member_groups" limit 10; """ athena_cli = boto3.client("athena") df = read_sql_query( athena_cli=athena_cli, sql=sql, database=database, workgroup=workgroup, s3_output=s3_output, ) logger.info(df) if __name__ == "__main__": main()
実行ログ
検索の実行、ポーリングを行い、Athenaの処理結果をDataFrameとして取得できているのがわかります。
利用シーンは限られますが、boto3
でAthenaのクエリ実行を行い結果をpandas
のDataFrameとして取得することができました。
boto3
で取得するAthenaの検索結果は通常JSON形式となりますがDataFrameとして受け取ることで、データ分析やデータの利活用が行いやすくなりますね。
[INFO] athena-query.py read_sql_query 48 SUCCEEDED [INFO] athena-query.py read_sql_query 57 QueryQueueTimeInMillis: 124 [INFO] athena-query.py read_sql_query 60 TotalExecutionTimeInMillis: 588 [INFO] athena-query.py read_sql_query 63 DataScannedInBytes: 142 [INFO] athena-query.py read_sql_query 77 OK [INFO] athena-query.py main 102 test_id test_name 0 test01 テスト01 1 test02 テスト02 2 test03 テスト03 3 test04 テスト04 4 test05 テスト05
おわりに
稀なケースではありますが、boto3
を用いて実行したAthenaのクエリ結果をAWS SDK for pandas (awswrangler)
を用いてpandas
のDataFrameとして取得してみました。
以上になります。この記事がどなたかの助けになれば幸いです。